Java NIO - 基于 Netty 单体 IM 系统项目实践

项目介绍

该项目是一款基于 Netty 框架开发的高性能、轻量级即时通讯(IM)系统。它采用 Protobuf 作为序列化协议,旨在提供一个稳定、可扩展的双工通信架构,可以作为即时通讯的基础底座。


主要功能

  • 实时双工通信:基于 TCP 长连接,实现毫秒级的消息收发延迟。
  • 身份认证系统:支持用户登录/注销流程,确保每个连接的合法性。
  • 私聊路由 (P2P):服务端根据 UID 自动寻址,精准将消息投递至目标 Channel。
  • 心跳与链路保活:内置心跳检测机制,能够自动剔除死链接(僵尸连接)。
  • 异常自愈:客户端具备完善的断线监听与自动重连逻辑。
  • 控制台交互:提供人性化的命令行菜单,支持指令式操作。


组件架构

以下是 IM 单体项目的核心架构和组件全景图:

Client (客户端)

UI/Console 层
  • CommandController: 流程调度中心,管理 connectLock 同步锁
  • ConsoleCommand: 指令解析系统 (Login/Chat/Logout)
  • ClientSession: 本地会话管理,绑定 UID 与 Channel
Sender 业务层 (Outbound)

职责:封装 Protobuf 协议包并执行发送

LoginSender ChatSender LogoutSender
Netty Pipeline (Inbound)
Decoder: 字节流转 Protobuf POJO
LoginResponseHandler: 登录成功即自销毁
-- 登录成功后动态注入逻辑 --
HeartBeatClientHandler: 周期性探活
ChatMsgHandler: 实时消息接收
ExceptionHandler: 异常捕获与重连引导
TCP / Protobuf
[1] 身份认证握手
[2] 心跳维持(30s)
[3] P2P 消息路由
[4] 优雅断开连接

Server (服务端)

SessionMap 寻址中心

全局 ConcurrentHashMap:记录 UID ⇄ Channel。通过该 Map 实现跨通道消息精准转发。

Netty Pipeline (Handler 链)
Encoder: 对象序列化为字节流
LoginRequestHandler: 权限与 Token 校验
HeartBeatServerHandler: 读超时自动断开
ChatRedirectHandler: 核心路由与转发
Business Processor (异步处理)

解耦 IO 线程,处理耗时业务

LoginProcessor LogoutProcessor

IM系统架构与组件协作总结:

  • 连接同步:CommandController 利用 wait/notify 机制,使控制台线程在Netty异步连接结果返回前保持阻塞。
  • 状态管理:客户端登录成功后通过 pipeline.remove/addAfter 实现处理器动态热插拔。
  • 消息转发:服务端拦截ChatRequest,通过SessionMap检索目标UID关联的Channel并执行 writeAndFlush

客户端:

  • CommandController: 负责同步用户输入与网络层异步回调。
  • Sender 族 (Login/Chat/Logout): 负责将业务 POJO 封装为符合协议规范的字节流。
  • Pipeline 处理器:
    • LoginResponseHandler: 状态转换守卫,登录成功后执行 Pipeline 热插拔。
    • ChatMsgHandler: 监听入站消息并渲染到控制台。

服务端:

  • SessionMap: 全局并发哈希表,维护 UID ⇄ Channel 映射,是消息分发的心脏。
  • RedirectHandler: 消息路由中心,负责解析目标地址并完成跨通道路由。
  • Business Processors: 独立于 IO 线程的业务执行单元,处理耗时逻辑(如登录校验)。


客户端原理和流程

  • 启动与同步连接:当程序启动时,控制台线程通过 connectLock.wait() 进入阻塞状态。Netty 的 EventLoop 线程在物理链路建立成功后,通过回调触发 notifyAll(),从而唤醒控制台线程。解决了 Netty 异步连接导致的“命令先于连接发送”的竞态问题。

  • 状态机切换 :项目采用了“热插拔”设计。

    • 初始化态:Pipeline 仅包含基础的编码器和登录处理器。
    • 业务态:一旦 LoginResponse 返回成功,客户端 addAfter 注入心跳和聊天处理器,并立即 remove 登录处理器。
  • 消息寻址模型

    • 发送:Client A 发送 ChatRequest,包含 toUserId = B。
    • 路由:Server 接收后,在 SessionMap 中通过 B 找到其对应的 ChannelB。
    • 投递:调用 ChannelB.writeAndFlush()。


其他说明

环境依赖:JDK 17+、Netty 4.x、Protobuf 3.x

启动说明

  • Step 1: 启动服务端 ChatNettyServer,默认监听指定端口。
  • Step 2: 启动客户端 ChatNettyClient。
  • Step 3: 进入控制台,根据菜单指令进行操作:
    • 输入 1 进行登录。
    • 输入 2 发起聊天(需指定对方 UID)。
    • 输入 10 退出登录并断开连接。


单体IM系统源码

父模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.demo</groupId>
<artifactId>im-solo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>

<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<protobuf.version>4.28.2</protobuf.version>
<netty.version>4.2.12.Final</netty.version>
<guava.version>33.4.0-jre</guava.version>
<spring-boot.version>3.5.13</spring-boot.version>
<logback.version>1.5.25</logback.version>
<lombok.version>1.18.44</lombok.version>
</properties>

<modules>
<module>im-client</module>
<module>im-server</module>
<module>im-common</module>
</modules>

<dependencyManagement>
<dependencies>
<!--spring boot pom 版本字典(置顶)-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.18.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.18.0</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>


common 模块

依赖配置

重点是需要引入 protobuf 需要的依赖和插件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.demo</groupId>
<artifactId>im-solo</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>im-common</artifactId>

<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.60</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.12.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>

<!--logback-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
</dependencies>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protoSourceRoot>src/main/proto</protoSourceRoot>
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<!--绑定到 compile 阶段,每次你运行 mvn compile,它都会自动扫描 ${protoSourceRoot} 目录下的 .proto 文件,并生成对应的 Java 类-->
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


定义数据协议

src/main/proto/config/msg.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
syntax = "proto3";
package com.owlias.im.common.core.bean.msg;

/*消息枚举类型*/
enum HeadType {
LOGIN_REQUEST = 0; // 登录请求
LOGIN_RESPONSE = 1; // 登录相应
LOGOUT_REQUEST = 2; // 退出请求
LOGOUT_RESPONSE = 3; // 退出响应
HEART_BEAT = 4; // 心跳
MESSAGE_REQUEST = 5; // 聊天请求
MESSAGE_RESPONSE = 6; // 聊天响应
MESSAGE_NOTIFICATION = 7; // 通知
}

/*登录请求*/
message LoginRequest {
string uid = 1; // 用户唯一id
string deviceId = 2; // 设备ID
string token = 3; // 用户token
uint32 platform = 4; // 客户端平台 windows、mac、android、ios、web(服务端可以根据 platform 实现“同平台互踢”或“多端同时在线”的逻辑。)
string appVersion = 5; // APP 版本号(如果某次更新改变了协议,服务端可以根据 appVersion 给旧版本客户端返回特定的 code 提示升级。)
}

/*登录响应*/
message LoginResponse {
bool result = 1; // true表示发送成功,false表示发送失败
uint32 code = 2; // 错误码
string info = 3; // 错误描述
uint32 expose = 4; // 错误描述是否提示给用户: 1 提示、0 不提示(后端可以直接通过这个开关决定前端是否弹框,避免了前端硬编码错误提示)
}

/*聊天请求*/
message MessageRequest {
uint64 msgId = 1;
string from = 2;
string to = 3;
uint64 time = 4;
uint32 msgType = 5;
string content = 6;
string url = 8;
string property = 9;
string fromNick = 10;
string json = 11;
}

/*聊天响应*/
message MessageResponse {
bool result = 1;
uint32 code = 2;
string info = 3;
uint32 expose = 4;
}

/*通知*/
message MessageNotification {
uint64 noId = 1;
string json = 2;
string timestamp = 3;
}

/*心跳*/
message MessageHeartBeat {
uint32 seq = 1;
string uid = 2;
string json = 3;
}

/*顶层消息:信封模式设计*/
message Message {
HeadType type = 1; // 通用字段: 消息类型
uint64 sequence = 2; // 通用字段:消息序列号(解决消息 “不丢、不乱” 的基石)
string sessionId = 3; // 通用字段:会话id(用于异步请求的响应匹配 Request-Response Matching)
oneof body { // 语法层面保证在一个包里只存在一种具体的业务对象,节省空间
LoginRequest loginRequest = 4; // 登录请求
LoginResponse loginResponse = 5; // 登录响应
MessageRequest messageRequest = 6; // IM消息请求
MessageResponse messageResponse = 7; // IM消息响应
MessageNotification notification = 8; // 系统通知
MessageHeartBeat heartBeat = 9; // 心跳
}
}


并发相关的设计

CallbackTask

1
2
3
4
5
public interface CallbackTask<R> {
R execute() throws Exception;
void onBack(R r);
void onException(Throwable t);
}

ExecuteTask

1
2
3
public interface ExecuteTask {
void execute();
}

FutureTaskScheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
import com.owlias.im.common.utils.ThreadUtil;
import java.util.concurrent.ThreadPoolExecutor;

public class FutureTaskScheduler {
static final ThreadPoolExecutor mixPool;
static {
mixPool = ThreadUtil.getMixedTargetThreadPool();
}

public static void add(ExecuteTask executeTask) {
mixPool.submit(executeTask::execute);
}
}

CallbackTaskScheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.google.common.util.concurrent.*;
import com.owlias.im.common.utils.ThreadUtil;
import java.util.concurrent.ExecutorService;

public class CallbackTaskScheduler {
static ListeningExecutorService guavaPool;
static {
ExecutorService jPool = ThreadUtil.getMixedThreadPoolInstance();
guavaPool = MoreExecutors.listeningDecorator(jPool);
}

public static <R> void add(CallbackTask<R> executeTask) {
ListenableFuture<R> future = guavaPool.submit(executeTask::execute);

Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(R r) {
executeTask.onBack(r);
}

public void onFailure(Throwable t) {
executeTask.onException(t);
}
}, guavaPool);
}
}

ThreadUtils,请参考 《Java 工具类之线程相关》


POJO类

ChatMsg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import com.owlias.im.common.core.bean.msg.Msg;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;

/**
* 提供POJO与底层协议之间的转换桥梁。
*/
@Data
public class ChatMsg {

private User user;
private long msgId;
private String from;
private String to;
private long time;
private MSG_TYPE msgType;
private String content;
private String url; // 多媒体地址
private String property; // 附加属性
private String fromNick; // 发送者昵称
private String json; // 附加的json串

// 消息类型 1:纯文本 2:音频 3:视频 4:地理位置 5:其他
public enum MSG_TYPE {
TEXT,
AUDIO,
VIDEO,
POS,
OTHER;
}

public ChatMsg(User user) {
if (null == user) {
return;
}
this.user = user;
this.setTime(System.currentTimeMillis());
this.setFrom(user.getUid());
this.setFromNick(user.getNickName());
}

/**
* 模型映射:它将 Java 对象中的属性平铺到 Protobuf 的 Builder 中
*/
public void fillMsg(Msg.MessageRequest.Builder cb) {
if (msgId > 0) {
cb.setMsgId(msgId);
}
if (StringUtils.isNotEmpty(from)) {
cb.setFrom(from);
}
if (StringUtils.isNotEmpty(to)) {
cb.setTo(to);
}
if (time > 0) {
cb.setTime(time);
}
if (msgType != null) {
cb.setMsgType(msgType.ordinal());
}
if (StringUtils.isNotEmpty(content)) {
cb.setContent(content);
}
if (StringUtils.isNotEmpty(url)) {
cb.setUrl(url);
}
if (StringUtils.isNotEmpty(property)) {
cb.setProperty(property);
}
if (StringUtils.isNotEmpty(fromNick)) {
cb.setFromNick(fromNick);
}

if (StringUtils.isNotEmpty(json)) {
cb.setJson(json);
}
}
}

User

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
@Data
public class User {

private static final AtomicInteger NO = new AtomicInteger(1);
public enum PLAT_TYPE {
WINDOWS, MAC, ANDROID, IOS, WEB, OTHER;
}

String uid = String.valueOf(NO.getAndIncrement()); // 演示环境的默认值(可以接收客户端传来的uid)
String devId = UUID.randomUUID().toString();
String token = UUID.randomUUID().toString();
String nickName = "nickName";
PLAT_TYPE platform = PLAT_TYPE.MAC;
private String sessionId;

public void setPlatform(int platform) {
PLAT_TYPE[] values = PLAT_TYPE.values();
this.platform = (platform >= 0 && platform < values.length) ? values[platform] : PLAT_TYPE.OTHER;
}

@Override
public String toString() {
return "User{" +
"uid='" + uid + '\'' +
", devId='" + devId + '\'' +
", token='" + token + '\'' +
", nickName='" + nickName + '\'' +
", platform=" + platform +
'}';
}

/**
* 将原始的二进制登录请求对象转化为内存中的User对象。
*/
public static User fromLoginMsg(Msg.LoginRequest info) {
User user = new User();
user.uid = info.getUid();
user.devId = info.getDeviceId();
user.token = info.getToken();
user.setPlatform(info.getPlatform());
log.info("登录中: {}", user);
return user;
}
}


解码和编码器

SimpleProtobufDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import com.google.protobuf.InvalidProtocolBufferException;
import com.owlias.im.common.core.ProtoInstant;
import com.owlias.im.common.core.bean.msg.Msg;
import com.owlias.im.common.core.ex.InvalidFrameException;
import com.owlias.im.common.utils.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;

/**
* @author KJ
* @description 解码器
*/
@Slf4j
public class SimpleProtobufDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object outMsg = decode0(ctx, in);
if (outMsg != null) {
// 获取业务消息
out.add(outMsg);
}
}

public static Object decode0(ChannelHandlerContext ctx, ByteBuf in) throws InvalidFrameException, InvalidProtocolBufferException {
// 标记一下当前的 readIndex 的位置,与 resetReaderIndex 配置使用,处理TCP粘包与半包
in.markReaderIndex();
// 判断包头长度
if (in.readableBytes() < 8) {// 不够包头
return null;
}
// 读取魔数
short magic = in.readShort();
if (magic != ProtoInstant.MAGIC_CODE) {
String error = "客户端口令不对: " + ctx.channel().remoteAddress();
// 异常连接,直接报错,关闭连接
throw new InvalidFrameException(error);
}
// 读取版本
short version = in.readShort();
if (version != ProtoInstant.VERSION_CODE) {
String error = "协议的版本不对: " + ctx.channel().remoteAddress();
// 异常连接,直接报错,关闭连接
throw new InvalidFrameException(error);
}
// 读取传送过来的消息的长度。
int length = in.readInt();
// 长度如果小于0
if (length < 0) {
// 非法数据,关闭连接
ctx.close();
}
// 读到的消息体长度如果小于传送过来的消息长度
if (in.readableBytes() < length) {
// 重置读取位置
in.resetReaderIndex();
return null;
}
Logger.cfo("decoder length=" + in.readableBytes());
/*byte[] array;
if (in.hasArray()) {
array = new byte[length];
in.readBytes(array, 0, length);
} else {
// 直接缓冲
array = new byte[length];
in.readBytes(array, 0, length);
}*/
byte[] array = new byte[length];
in.readBytes(array); // 直接 readBytes 即可,Netty 会处理堆内/堆外内存的拷贝

// 攒够了一个完整的包,才真正进行 parseFrom 字节转成对象
return Msg.Message.parseFrom(array);
}
}

SimpleProtobufEncoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class SimpleProtobufEncoder extends MessageToByteEncoder<Msg.Message> {

@Override
protected void encode(ChannelHandlerContext ctx, Msg.Message msg, ByteBuf out) throws Exception {
encode0(msg, out);
}

public static void encode0(Msg.Message msg, ByteBuf out) {
out.writeShort(ProtoInstant.MAGIC_CODE);
out.writeShort(ProtoInstant.VERSION_CODE);

byte[] bytes = msg.toByteArray();// 将 Msg.Message 对象转换为 byte

// 预留加密消息体
/*ThreeDES des = channel.channel().attr(Constants.ENCRYPT).get();
byte[] encryptByte = des.encrypt(bytes);*/
int length = bytes.length;// 读取消息的长度
Logger.cfo("encoder length=" + length);

// 先将消息长度写入,也就是消息头
out.writeInt(length);
// 消息体中包含我们要发送的数据
out.writeBytes(bytes);

/*log.debug("send "
+ "[remote ip:" + ctx.channel().remoteAddress()
+ "][total length:" + length
+ "][bare length:" + msg.getSerializedSize() + "]");*/
}
}


异常类定义

InvalidFrameException

1
2
3
4
5
public class InvalidFrameException extends Exception {
public InvalidFrameException(String s) {
super(s);
}
}


常数定义

ProtoInstant

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class ProtoInstant {
/**
* 魔数,可以通过配置获取
*/
public static final short MAGIC_CODE = 0x86;

/**
* 版本号
*/
public static final short VERSION_CODE = 0x01;

/**
* 客户端平台
*/
public interface Platform {
/**
* windows
*/
public static final int WINDOWS = 1;
/**
* mac
*/
public static final int MAC = 2;
/**
* android端
*/
public static final int ANDROID = 3;
/**
* IOS端
*/
public static final int IOS = 4;
/**
* WEB端
*/
public static final int WEB = 5;
/**
* 未知
*/
public static final int UNKNOWN = 6;
}

/**
* 返回码枚举类
*/
public enum ResultCodeEnum {
SUCCESS(0, "Success"), // 成功
AUTH_FAILED(1, "登录失败"),
NO_TOKEN(2, "没有授权码"),
UNKNOWN_ERROR(3, "未知错误"),
;

private Integer code;
private String desc;

ResultCodeEnum(Integer code, String desc) {
this.code = code;
this.desc = desc;
}

public Integer getCode() {
return code;
}

public String getDesc() {
return desc;
}
}
}


server 模块

依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.demo</groupId>
<artifactId>im-solo</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>im-server</artifactId>

<dependencies>
<dependency>
<groupId>com.demo</groupId>
<artifactId>im-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>

<build>
<finalName>im-server</finalName>

<plugins>
<plugin>
<!--编译插件-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path>
</annotationProcessorPaths>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
<parameters>true</parameters>
</configuration>
</plugin>

<plugin>
<!--只有包含 main 方法的启动模块才需要这个插件-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<configuration>
<excludeDevtools>true</excludeDevtools>
<mainClass>com.owlias.im.server.ServerApp</mainClass>
</configuration>
</plugin>

<plugin>
<!--打分发包插件-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


启动相关类

采用了 Spring Boot + Netty 的深度集成方案。利用 Spring 管理业务逻辑(如 LoginRequestHandler),利用 Netty 处理极致的网络 I/O,两者各司其职。在 ServerApp 中,采用非常稳健的启动顺序:

  • Spring 优先:先启动 Spring 容器,完成所有 Bean(依赖注入、配置读取)的初始化。
  • Netty 紧随其后:从容器中取出 ChatNettyServer 并运行。

保证当 Netty 开始接收连接时,所有的业务处理器(Handler)已经全部就绪,不会出现空指针异常。

ServerApp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class ServerApp {

public static void main(String[] args) {
// 1. 启动并初始化 Spring 环境
ApplicationContext context = SpringApplication.run(ServerApp.class, args);

// 2. 获取 Netty 服务 Bean 并运行
ChatNettyServer nettyServer = context.getBean(ChatNettyServer.class);
nettyServer.run();
}
}

ChatNettyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Slf4j
@Service
public class ChatNettyServer {

@Value("${server.port:8081}")
private int port;

// 将线程组设为成员变量,方便优雅关闭
private EventLoopGroup bg;
private EventLoopGroup wg;

@Resource
private LoginRequestHandler loginRequestHandler;

@Resource
private ServerExceptionHandler serverExceptionHandler;

public void run() {
bg = new NioEventLoopGroup(1); // BossGroup:只负责接收连接 (Acceptor)
wg = new NioEventLoopGroup(); // WorkerGroup:负责处理 I/O 读写和业务逻辑,默认是CPU核心数的2倍

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bg, wg)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 使用池化分配器(类似线程池的思想)复用已有的内存块,极大降低CPU和内存损耗
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 1. 编解码器(处理 Protobuf)
ch.pipeline().addLast("decoder", new SimpleProtobufDecoder()); // inbound
ch.pipeline().addLast("encoder", new SimpleProtobufEncoder()); // outbound

// 2. 登录校验(登录成功后该 Handler 通常会从 pipeline 中移除自己)
ch.pipeline().addLast("login", loginRequestHandler); // inbound

// 4. 异常处理(放在流水线末尾捕获所有 Handler 抛出的异常)
ch.pipeline().addLast("exception", serverExceptionHandler); // inbound
}
});

ChannelFuture channelFuture = b.bind().sync();
log.info("IM-Server 启动成功, 监听地址: {}", channelFuture.channel().localAddress());
// 阻塞直到频道关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("IM-Server 启动失败", e);
} finally {
close();
}
}

// 当 Spring 容器关闭时,确保 Netty 优雅退出
@PreDestroy
public void close() {
if (wg != null) wg.shutdownGracefully();
if (bg != null) bg.shutdownGracefully();
log.info("IM-Server 已优雅关闭并释放线程资源");
}
}


session 管理

SessionMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Slf4j
@Data
public class SessionMap {
// 单例
private SessionMap() {}
private static SessionMap singleInstance = new SessionMap();
public static SessionMap inst() {
return singleInstance;
}

// 会话集合
private ConcurrentHashMap<String, ServerSession> map = new ConcurrentHashMap<>();

/**
* 增加session对象
*/
public void addSession(ServerSession s) {
map.put(s.getSessionId(), s);
log.info("用户登录:id= {} 在线总数: {}", s.getUser().getUid(), map.size());
}

/**
* 获取session对象
*/
public ServerSession getSession(String sessionId) {
return map.getOrDefault(sessionId, null);
}

/**
* 根据用户id,获取session对象
*/
public List<ServerSession> getSessionsBy(String userId) {
return map.values()
.stream()
.filter(s -> s.getUser().getUid().equals(userId))
.collect(Collectors.toList());
}

/**
* 删除session
*/
public void removeSession(String sessionId) {
if (!map.containsKey(sessionId)) {
return;
}
ServerSession s = map.get(sessionId);
map.remove(sessionId);
Logger.tcfo("用户下线:id= " + s.getUser().getUid() + " 在线总数: " + map.size());
}

/**
* 判断是否登录
*/
public boolean hasLogin(User user) {
for (Map.Entry<String, ServerSession> next : map.entrySet()) { // 如果人数过多,请新增一个 userToSessionsMap,防止过多的遍历
User u = next.getValue().getUser();
if (u.getUid().equals(user.getUid()) && u.getPlatform().equals(user.getPlatform())) {
return true;
}
}
return false;
}
}

ServerSession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package com.owlias.im.server.session;

import com.owlias.im.common.core.bean.User;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;

@Data
@Slf4j
public class ServerSession {
public static final AttributeKey<ServerSession> SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");

private Channel channel;
private User user;
@Getter
private final String sessionId;
private boolean isLogin = false; // 登录状态

public ServerSession(Channel channel) {
// 完成正向绑定 session 绑定 channel
this.channel = channel;
this.sessionId = buildNewSessionId();
}

public static ServerSession getSession(ChannelHandlerContext ctx) {
// 反向导航(不在需要从sessionMap中查找,效率奇高)
Channel channel = ctx.channel();
return channel.attr(ServerSession.SESSION_KEY).get();
}

public static void closeSession(ChannelHandlerContext ctx) {
// 关闭连接
ServerSession session = ctx.channel().attr(ServerSession.SESSION_KEY).get();
if (null != session && session.isValid()) {
session.close();
SessionMap.inst().removeSession(session.getSessionId());
}
}

public void reverseBind() {
// 反向绑定,最终和 channel 通道实现双向绑定。顺便加入到会话集合中
channel.attr(ServerSession.SESSION_KEY).set(this);
log.info("完成 channel 绑定 session {}", channel.remoteAddress());
SessionMap.inst().addSession(this);
isLogin = true;
}

public void unbind() {
isLogin = false;
SessionMap.inst().removeSession(getSessionId());
this.close();
}

private static String buildNewSessionId() {
String uuid = UUID.randomUUID().toString();
return uuid.replaceAll("-", "");
}

public boolean isValid() {
return getUser() != null;
}

public synchronized void writeAndFlush(Object pkg) {
/*// 当系统水位过高时,系统不应继续发送消息,防止发送队列积压
// 写Protobuf数据帧
if (channel.isWritable()) { // 低水位
channel.writeAndFlush(pkg);
} else { // 高水位时
log.debug("通道很忙,消息被暂存了");
// 写入消息暂存的分布式存储,如果mongo等channel空闲之后,再写出去
}*/

if (channel.isActive()) {
// 生产环境建议保留上面注释掉的背压代码。我们在演示环境只实现一个失败的监听
channel.writeAndFlush(pkg).addListener(future -> {
if (!future.isSuccess()) {
log.error("回写消息失败", future.cause());
}
});
}
}

// 关闭连接
public synchronized void close() {
ChannelFuture future = channel.close();
future.addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
log.error("CHANNEL_CLOSED error ");
}
});
}

public void setUser(User user) {
this.user = user;
user.setSessionId(sessionId);
}
}


入站处理链条相关

动态流水线
阶段 A:鉴权态
Decoder
LoginResponseH.
ExceptionH.
Login Success
Dynamic Update
阶段 B:业务态
Decoder
HeartBeatH.
ChatMsgH.
ExceptionH.

SimpleProtobufDecoder -> LoginRequestHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import com.owlias.im.common.concurrent.CallbackTask;
import com.owlias.im.common.concurrent.CallbackTaskScheduler;
import com.owlias.im.common.core.bean.msg.Msg;
import com.owlias.im.server.processor.LoginProcessor;
import com.owlias.im.server.session.ServerSession;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@ChannelHandler.Sharable // 这意味着所有的连接都共享这一个实例,可以极大节省资源,防止每次new新的handler
public class LoginRequestHandler extends ChannelInboundHandlerAdapter {

@Resource
private LoginProcessor loginProcessor;

@Resource
private ChatRedirectHandler chatRedirectHandler;

@Resource
private LogoutRequestHandler logoutRequestHandler;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("收到一个新的连接,但是没有登录 {}", ctx.channel().id());
super.channelActive(ctx);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof Msg.Message pkg)) {
super.channelRead(ctx, msg);
return;
}

// 取得请求类型
Msg.HeadType headType = pkg.getType();
if (!headType.equals(loginProcessor.type())) {
super.channelRead(ctx, msg);
return;
}

// 初始化会话
ServerSession session = new ServerSession(ctx.channel());
// 异步任务,处理登录的逻辑,防止阻塞
CallbackTaskScheduler.add(new CallbackTask<Boolean>() {
@Override
public Boolean execute() throws Exception {
return loginProcessor.action(session, pkg);
}

// 异步任务返回
@Override
public void onBack(Boolean r) {
if (r) {
// 1. 先进行 isActive 判断,确保通道依然活跃(应对这个时候业务用户关掉了app)
if (!ctx.channel().isActive()) return;

// 动态管理流水线节点
String currentName = ctx.name(); // 动态获取当前处理器的名字
ctx.pipeline().addAfter(currentName, "heartBeat", new HeartBeatServerHandler()); // duplex bound
ctx.pipeline().addAfter("heartBeat", "logout", logoutRequestHandler); // inbound
ctx.pipeline().addAfter("logout", "chat", chatRedirectHandler); // inbound
ctx.pipeline().remove(LoginRequestHandler.this); // 登录成功移除登录请求的handler,节省资源

log.info("登录成功: {}", session.getUser());
} else {
ServerSession.closeSession(ctx);
log.info("登录失败: {}", session.getUser());
}
}

// 异步任务异常
@Override
public void onException(Throwable t) {
ServerSession.closeSession(ctx);
log.info("登录异常: {}", session.getUser());
}
});
}
}

HeartBeatServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* @author KJ
* @description 继承自 IdleStateHandler,它内部持有状态(比如上次读取数据的时间戳),绝对不能标注为 @Sharable,需通过 new 创建。
*/
@Slf4j
public class HeartBeatServerHandler extends IdleStateHandler {
// 通常客户端的心跳发送频率会设为 60 秒或 90 秒。这里设置 150 秒(约2.5倍频率)预留了网络波动和拥塞的容错时间,避免了因一两次丢包就误杀连接。
private static final int READ_IDLE_GAP = 150;

public HeartBeatServerHandler() {
super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (!(msg instanceof Msg.Message pkg)) {
super.channelRead(ctx, msg);
return;
}

// 判断消息类型
Msg.HeadType headType = pkg.getType();
if (headType.equals(Msg.HeadType.HEART_BEAT)) {
// 异步处理,将心跳包,直接回复给客户端
FutureTaskScheduler.add(() -> {
if (ctx.channel().isActive()) {
ctx.writeAndFlush(msg);
}
});
}
super.channelRead(ctx, msg);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent event) {
if (event.state() == IdleState.READER_IDLE) {
log.warn("心跳读取超时,强制关闭连接: {}", ctx.channel().id());
// 调用 ServerSession.closeSession(ctx) 进行清理(关闭 Channel 并清理 SessionMap)
ServerSession.closeSession(ctx);
}
}
}
}

LogoutRequestHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
@ChannelHandler.Sharable
public class LogoutRequestHandler extends ChannelInboundHandlerAdapter {

@Resource
private LogoutProcessor logoutProcessor;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof Msg.Message pkg) || !pkg.getType().equals(logoutProcessor.type())) {
super.channelRead(ctx, msg);
return;
}

ServerSession session = ServerSession.getSession(ctx);
// 异步执行登出逻辑
FutureTaskScheduler.add(() -> logoutProcessor.action(session, pkg));
}
}

ChatRedirectHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Slf4j
@Service
@ChannelHandler.Sharable
public class ChatRedirectHandler extends ChannelInboundHandlerAdapter {

@Resource
ChatRedirectProcessor chatRedirectProcessor;

/**
* 收到消息
*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (null == msg || !(msg instanceof Msg.Message)) {
super.channelRead(ctx, msg);
return;
}

// 判断消息类型
Msg.Message pkg = (Msg.Message) msg;
Msg.HeadType headType = ((Msg.Message) msg).getType();
if (!headType.equals(chatRedirectProcessor.type())) {
super.channelRead(ctx, msg);
return;
}

// 反向导航
ServerSession session = ctx.channel().attr(ServerSession.SESSION_KEY).get();

// 判断是否登录
if (null == session || !session.isLogin()) {
log.error("用户尚未登录,不能发送消息");
return;
}

// 异步处理IM消息转发的逻辑
FutureTaskScheduler.add(() -> chatRedirectProcessor.action(session, pkg));
}
}

ServerExceptionHandler:作为整个 Netty 流水线的最后一道防线,ServerExceptionHandler 扮演着后勤部和战场清理员的角色。它的存在确保了系统即使在面对网络波动、恶意攻击或意料之外的代码错误时,依然能保持资源不泄露和状态一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j
@ChannelHandler.Sharable
@Service
public class ServerExceptionHandler extends ChannelInboundHandlerAdapter {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof InvalidFrameException) {
// 自定义的业务异常(例如魔数不对、版本不匹配),记录日志,方便排查
log.error("发生未捕获异常: ", cause);
}
if (cause instanceof IOException) {
// 网络层异常(用户直接划掉 App、信号断开、或手机电量耗尽关机时),防止服务器内存中出现幽灵在线用户。
log.error(cause.getMessage());
log.error("客户端已经关闭连接,这里需要做下线处理");
ServerSession.closeSession(ctx);
} else {
log.error("发生未捕获异常: ", cause);
}
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 通道 Read 读取 Complete 完成,做刷新操作
ctx.flush();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 保证无论是异常断开还是正常断开,用户的 Session 最终都会被销毁。
// 在 IM 系统中,Session 的及时清理至关重要,否则会导致“消息转发给了已断开的连接”而丢失。
ServerSession.closeSession(ctx);
}
}


线程 Processor

ServerProcessor 接口:

1
2
3
4
public interface ServerProcessor {
Msg.HeadType type();
boolean action(ServerSession ch, Msg.Message proto);
}

LoginProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import com.owlias.im.common.core.ProtoInstant;
import com.owlias.im.common.core.bean.User;
import com.owlias.im.common.core.bean.msg.Msg;
import com.owlias.im.server.converter.LoginResponseConverter;
import com.owlias.im.server.session.ServerSession;
import com.owlias.im.server.session.SessionMap;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;

@Slf4j
@Service
public class LoginProcessor implements ServerProcessor {

@Resource
LoginResponseConverter loginResponseConverter;

public Msg.HeadType type() {
return Msg.HeadType.LOGIN_REQUEST;
}

@Override
public boolean action(ServerSession session, Msg.Message proto) {
Msg.LoginRequest info = proto.getLoginRequest();
long seqNo = proto.getSequence();
User user = User.fromLoginMsg(info);

// 检查用户
boolean isValidUser = checkUser(user);
if (!isValidUser) {
ProtoInstant.ResultCodeEnum resultCode = ProtoInstant.ResultCodeEnum.NO_TOKEN;
// 构造登录失败的报文
Msg.Message response = loginResponseConverter.build(resultCode, seqNo, "-1");
// 发送登录失败的报文
session.writeAndFlush(response);
return false;
}

session.setUser(user);
// 传输 channel 反向绑定服务端 session
session.reverseBind();

// 登录成功
ProtoInstant.ResultCodeEnum resultCode = ProtoInstant.ResultCodeEnum.SUCCESS;
// 构造登录成功的报文
Msg.Message response = loginResponseConverter.build(resultCode, seqNo, session.getSessionId());
// 发送登录成功的报文
session.writeAndFlush(response);
return true;
}

/**
* 登录检查,演示环境默认登陆成功
*/
private boolean checkUser(User user) {
/*if (SessionMap.inst().hasLogin(user)) {
return false;
}
// 校验用户,比较耗时的操作,需要100 ms以上的时间
// 方法1:调用远程用户 restfull 校验服务
// 方法2:调用数据库接口校验
return true;*/

// 在 oldSessions.forEach(ServerSession::close) 之前,可以先给旧连接发送一个特殊的 MessageNotification(例如:code=KICK_OUT)
// 告知用户“您的账号在其他地方登录,您已被迫下线”。否则旧客户端会表现为“无缘无故断线”,体验较差。
List<ServerSession> oldSessions = SessionMap.inst().getSessionsBy(user.getUid());
if (oldSessions != null && !oldSessions.isEmpty()) {
log.warn("用户 {} 重复登录,正在清理旧连接", user.getUid()); // 同账号登录踢掉旧的!
// 强制关闭旧连接,SessionMap 会在 channelInactive 中自动清理
oldSessions.forEach(ServerSession::close);
}
return true; // 允许新连接登录
}
}

LoginResponseConverter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Service
public class LoginResponseConverter {
/**
* 登录应答 应答消息protobuf
*/
public Msg.Message build(ProtoInstant.ResultCodeEnum en, long seqId, String sessionId) {
Msg.Message.Builder outer = Msg.Message.newBuilder()
.setType(Msg.HeadType.LOGIN_RESPONSE) // 设置消息类型
.setSequence(seqId)
.setSessionId(sessionId); // 设置应答流水,与请求对应

Msg.LoginResponse.Builder b = Msg.LoginResponse.newBuilder()
.setCode(en.getCode())
.setInfo(en.getDesc())
.setExpose(1);

outer.setLoginResponse(b.build());
return outer.build();
}
}

LogoutProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@Service
public class LogoutProcessor implements ServerProcessor {
@Override
public Msg.HeadType type() {
return Msg.HeadType.LOGOUT_REQUEST;
}

@Override
public boolean action(ServerSession session, Msg.Message proto) {
if (session != null) {
log.info("用户下线请求: {}", session.getUser());
// 1. 解绑并移除 Session
session.unbind();
// 2. 关闭连接
session.close();
}
return true;
}
}

ChatRedirectProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
@Service
public class ChatRedirectProcessor implements ServerProcessor {

@Override
public Msg.HeadType type() {
return Msg.HeadType.MESSAGE_REQUEST;
}

@Override
public boolean action(ServerSession ch, Msg.Message proto) {
// 聊天处理
Msg.MessageRequest msg = proto.getMessageRequest();
Logger.tcfo("chatMsg | from="
+ msg.getFrom()
+ " , to=" + msg.getTo()
+ " , content=" + msg.getContent());
// 获取接收方的 chatID
String to = msg.getTo();
List<ServerSession> toSessions = SessionMap.inst().getSessionsBy(to);
if (toSessions == null) {
// 接收方离线
Logger.tcfo("[" + to + "] 不在线,发送失败!");
} else {
toSessions.forEach((session) -> {
// 将IM消息发送到接收方
session.writeAndFlush(proto);
});
}
return true;
}
}


client 模块

依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.demo</groupId>
<artifactId>im-solo</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>im-client</artifactId>

<dependencies>
<dependency>
<groupId>com.demo</groupId>
<artifactId>im-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>

<build>
<finalName>im-client</finalName>

<plugins>
<plugin>
<!--编译插件-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path>
</annotationProcessorPaths>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
<parameters>true</parameters>
</configuration>
</plugin>

<plugin>
<!--只有包含 main 方法的启动模块才需要这个插件-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<configuration>
<excludeDevtools>true</excludeDevtools>
<mainClass>com.owlias.im.client.ClientApp</mainClass>
</configuration>
</plugin>

<plugin>
<!--打分发包插件-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


启动相关

ClientApp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@SpringBootApplication
public class ClientApp {

public static void main(String[] args) {
// 1. 启动并初始化 Spring 环境
ApplicationContext context = SpringApplication.run(ClientApp.class, args);

// 2. 启动聊天客户端
CommandController commandClient = context.getBean(CommandController.class);
commandClient.initCommandMap();
try {
commandClient.commandThreadRunning();
} catch (InterruptedException e) {
log.error("commandThreadRunning error", e);
}
}
}

CommandController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import com.owlias.im.client.commond.*;
import com.owlias.im.client.sender.ChatSender;
import com.owlias.im.client.sender.LoginSender;
import com.owlias.im.client.sender.LogoutSender;
import com.owlias.im.client.session.ClientSession;
import com.owlias.im.common.core.bean.User;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
import jakarta.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

@Slf4j
@Data
@Service
public class CommandController {
@Resource
private ChatNettyClient chatNettyClient;

private Map<String, BaseCommand> commandMap;
@Resource
ClientCommandMenu clientCommandMenu;
@Resource
ChatConsoleCommand chatConsoleCommand;
@Resource
LoginConsoleCommand loginConsoleCommand;
@Resource
LogoutConsoleCommand logoutConsoleCommand;

@Resource
private ChatSender chatSender;
@Resource
private LoginSender loginSender;
@Resource
private LogoutSender logoutSender;

// 线程同步锁对象(本质是为了解决 “主线程运行太快,而网络反馈太慢” 之间的矛盾)
private final Object connectLock = new Object();
private boolean connectFlag = false;
private User user;
private ClientSession session;
private Channel channel;

public boolean isLogin() {
return null != session && session.isLogin();
}

public void initCommandMap() {
// 通过 commandMap 将菜单、登录、聊天指令解耦。
commandMap = new HashMap<>();
commandMap.put(clientCommandMenu.getKey(), clientCommandMenu);
commandMap.put(loginConsoleCommand.getKey(), loginConsoleCommand);
commandMap.put(chatConsoleCommand.getKey(), chatConsoleCommand);
commandMap.put(logoutConsoleCommand.getKey(), logoutConsoleCommand);
clientCommandMenu.setAllCommand(commandMap);
}

public void commandThreadRunning() throws InterruptedException {
Scanner scanner = new Scanner(System.in);
while (true) {
// 默认展示菜单
clientCommandMenu.exec(scanner);
String key = clientCommandMenu.getCommandInput();
BaseCommand command = commandMap.get(key);

if (null == command) {
System.err.println("请输入正确的指令");
continue;
}

switch (key) {
case LoginConsoleCommand.KEY -> startLogin(scanner);
case ChatConsoleCommand.KEY -> startOneChat(scanner);
case LogoutConsoleCommand.KEY -> startLogout();
}
}
}

private void startLogin(Scanner scanner) {
// 同步执行连接服务端
if (!connectFlag) {
log.info("检测到连接已断开,正在尝试重新连接...");
startConnectServer();
synchronized (connectLock) {
try {
long startTime = System.currentTimeMillis();
// 使用 while 循环检查 connectFlag,防止虚假唤醒
// 当用户输入登录指令时,Netty 的连接动作(doConnect)是异步的。如果主线程不等待,代码会立刻执行 loginSender.sendLoginMsg(),而此时 Channel 可能还没建立,直接导致空指针或发送失败。
while (!connectFlag && (System.currentTimeMillis() - startTime < 5000)) {
connectLock.wait(5000 - (System.currentTimeMillis() - startTime));
}
} catch (InterruptedException e) {
log.error("等待连接中断", e);
}
}
}

if (!connectFlag) {
log.info("连接异常,请重新建立连接");
return;
}

// 登录
loginConsoleCommand.exec(scanner);
User user = new User();
user.setUid(loginConsoleCommand.getUserName());
user.setToken(loginConsoleCommand.getPassword());
user.setDevId("123456");
session.setUser(user);

this.user = user;
loginSender.setUser(user);
loginSender.setSession(session);
loginSender.sendLoginMsg();
}

public void startConnectServer() {
GenericFutureListener<ChannelFuture> listener = (ChannelFuture f) -> {
synchronized (connectLock) {
if (!f.isSuccess()) {
log.info("连接失败!");
connectFlag = false;
} else {
log.info("Owlias-IM 服务器连接成功!");
connectFlag = true;
channel = f.channel();
session = new ClientSession(channel);
session.setConnected(true);

// 监听物理连接断开(如服务器宕机或网络波动)
channel.closeFuture().addListener(closeFuture -> {
synchronized (connectLock) {
log.info("{}: 连接已经断开...", new Date());
connectFlag = false;
if (session != null) {
session.setConnected(false);
session.setLogin(false);
}
}
});
}
// 唤醒在 startLogin 中等待的命令线程
connectLock.notifyAll();
}
};
chatNettyClient.setConnectedListener(listener);
chatNettyClient.doConnect();
}

private void startOneChat(Scanner scanner) {
if (!isLogin()) {
log.info("startOneChat: 还没有登录,请先登录");
return;
}
chatConsoleCommand.exec(scanner);
chatSender.setUser(user);
chatSender.setSession(session);
chatSender.sendChatMsg(chatConsoleCommand.getToUserId(), chatConsoleCommand.getMessage());
}

private void startLogout() {
if (!isLogin()) {
log.info("未登录");
return;
}
logoutSender.setUser(user);
logoutSender.setSession(session);
logoutSender.sendLogoutMsg();

// 修改本地状态
session.setLogin(false);
connectFlag = false;
log.info("本地会话已注销");
}
}

ChatNettyClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import com.owlias.im.client.config.SystemConfig;
import com.owlias.im.client.handler.ExceptionHandler;
import com.owlias.im.client.handler.LoginResponseHandler;
import com.owlias.im.common.core.codec.SimpleProtobufDecoder;
import com.owlias.im.common.core.codec.SimpleProtobufEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import jakarta.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;

/**
* @author KJ
* @description
*/
@Slf4j
@Data
@Service
public class ChatNettyClient {

@Resource
private SystemConfig systemConfig;
@Resource
private LoginResponseHandler loginResponseHandler;
@Resource
private ApplicationContext applicationContext;

private GenericFutureListener<ChannelFuture> connectedListener;

private Bootstrap bootstrap;
private EventLoopGroup g;

public ChatNettyClient() {
g = new NioEventLoopGroup(1);
}

public void doConnect() {
if (connectedListener == null) {
log.error("未设置 connectedListener,连接取消");
return;
}

try {
bootstrap = new Bootstrap()
.group(g)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.remoteAddress(systemConfig.getHost(), systemConfig.getPort())
.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast("decoder", new SimpleProtobufDecoder());
ch.pipeline().addLast("encoder", new SimpleProtobufEncoder());
ch.pipeline().addLast(loginResponseHandler);
ch.pipeline().addLast(applicationContext.getBean(ExceptionHandler.class));
}
}
);

log.info("客户端开始连接 [Owlias IM]");

ChannelFuture f = bootstrap.connect(); // 异步发起连接
f.addListener(connectedListener);
} catch (Exception e) {
log.info("客户端连接失败! {}", e.getMessage());
}
}

public void close() {
g.shutdownGracefully();
}
}


session 相关

ClientSession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 实现客户端 Session会话
*/
@Slf4j
@Data
public class ClientSession {
public static final AttributeKey<ClientSession> SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");

/**
* 用户实现客户端会话管理的核心
*/
private Channel channel;
private User user;

/**
* 保存登录后的服务端 sessionId
*/
private String sessionId;
private boolean isConnected = false;
private boolean isLogin = false;

// 连接成功之后绑定通道
public ClientSession(Channel channel) {
// 正向的绑定
this.channel = channel;
this.sessionId = UUID.randomUUID().toString();
// 反向的绑定
channel.attr(ClientSession.SESSION_KEY).set(this);
}

// 登录成功之后设置sessionId
public static void loginSuccess(ChannelHandlerContext ctx, Msg.Message pkg) {
ClientSession session = getSession(ctx);
session.setSessionId(pkg.getSessionId());
session.setLogin(true);
log.info("登录成功");
}

public static ClientSession getSession(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
return channel.attr(ClientSession.SESSION_KEY).get();
}

public String getRemoteAddress() {
return channel.remoteAddress().toString();
}

// 写 protobuf 数据帧
public ChannelFuture writeAndFlush(Object pkg) {
return channel.writeAndFlush(pkg);
}

public void writeAndClose(Object pkg) {
ChannelFuture future = channel.writeAndFlush(pkg);
future.addListener(ChannelFutureListener.CLOSE);
}

// 关闭通道
public void close() {
isConnected = false;
ChannelFuture future = channel.close();
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.error("连接顺利断开");
}
});
}
}


converter 相关

BaseConverter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.owlias.im.client.session.ClientSession;
import com.owlias.im.common.core.bean.msg.Msg;

public class BaseConverter {
protected Msg.HeadType type;
private long seqId;
private ClientSession session;

public BaseConverter(Msg.HeadType type, ClientSession session) {
this.type = type;
this.session = session;
}

/**
* 构建消息基础部分
*/
public Msg.Message buildOuter(long seqId) {
return getOuterBuilder(seqId).buildPartial();
}

/**
* 构建消息基础部分 Builder
*/
public Msg.Message.Builder getOuterBuilder(long seqId) {
this.seqId = seqId;
return Msg.Message.newBuilder()
.setType(type)
.setSessionId(session.getSessionId())
.setSequence(seqId);
}
}

LoginMsgConverter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class LoginMsgConverter extends BaseConverter {
private final User user;

public LoginMsgConverter(User user, ClientSession session) {
super(Msg.HeadType.LOGIN_REQUEST, session);
this.user = user;
}

public Msg.Message build() {
Msg.Message.Builder outerBuilder = getOuterBuilder(-1);
Msg.LoginRequest.Builder lb =
Msg.LoginRequest.newBuilder()
.setDeviceId(user.getDevId())
.setPlatform(user.getPlatform().ordinal())
.setToken(user.getToken())
.setUid(user.getUid());
return outerBuilder.setLoginRequest(lb).build();
}

public static Msg.Message build(User user, ClientSession session) {
LoginMsgConverter converter = new LoginMsgConverter(user, session);
return converter.build();
}
}

HeartBeatMsgConverter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HeartBeatMsgConverter extends BaseConverter {
private final User user;

public HeartBeatMsgConverter(User user, ClientSession session) {
super(Msg.HeadType.HEART_BEAT, session);
this.user = user;
}

public Msg.Message build() {
Msg.Message.Builder outerBuilder = getOuterBuilder(-1);
Msg.MessageHeartBeat.Builder inner =
Msg.MessageHeartBeat.newBuilder()
.setSeq(0)
.setJson("{\"from\":\"client\"}")
.setUid(user.getUid());
return outerBuilder.setHeartBeat(inner).build();
}
}

ChatMsgConverter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ChatMsgConverter extends BaseConverter {
private ChatMsg chatMsg;
private User user;

private ChatMsgConverter(ClientSession session) {
super(Msg.HeadType.MESSAGE_REQUEST, session);
}

public Msg.Message build(ChatMsg chatMsg, User user) {
this.chatMsg = chatMsg;
this.user = user;
Msg.Message.Builder outerBuilder = getOuterBuilder(-1);
Msg.MessageRequest.Builder cb = Msg.MessageRequest.newBuilder();
// 填充字段
this.chatMsg.fillMsg(cb);
return outerBuilder.setMessageRequest(cb).build();
}

public static Msg.Message build(
ChatMsg chatMsg,
User user,
ClientSession session) {
ChatMsgConverter chatMsgConverter = new ChatMsgConverter(session);
return chatMsgConverter.build(chatMsg, user);
}
}


command 相关

BaseCommand

1
2
3
4
5
6
7
8
9
10
11
public interface BaseCommand {

// 获取命令的key
String getKey();

// 获取命令的提示信息
String getTip();

// 从控制台提取业务数据
void exec(Scanner scanner);
}

ClientCommandMenu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Data
@Service
public class ClientCommandMenu implements BaseCommand {

public static final String KEY = "0";
private String allCommandsShow;
private String commandInput;

@Override
public String getKey() {
return KEY;
}

@Override
public String getTip() {
return "show 所有命令";
}

@Override
public void exec(Scanner scanner) {
System.err.println("请输入某个操作指令:" + allCommandsShow);
// 获取第一个指令
commandInput = scanner.next();
}

public void setAllCommand(Map<String, BaseCommand> commandMap) {
Set<Map.Entry<String, BaseCommand>> entries = commandMap.entrySet();
Iterator<Map.Entry<String, BaseCommand>> iterator = entries.iterator();

StringBuilder menus = new StringBuilder();
menus.append("[menu] ");
while (iterator.hasNext()) {
BaseCommand next = iterator.next().getValue();
menus.append(next.getKey())
.append("->")
.append(next.getTip())
.append(" | ");

}
allCommandsShow = menus.toString();
}
}

LoginConsoleCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Data
@Service
public class LoginConsoleCommand implements BaseCommand {
public static final String KEY = "1";
private String userName;
private String password;

@Override
public void exec(Scanner scanner) {
System.out.println("请输入用户信息(id@password) ");
String[] info = null;
while (true) {
String input = scanner.next();
info = input.split("@");
if (info.length != 2) {
System.out.println("请按照格式输入(id@password):");
} else {
break;
}
}
userName = info[0];
password = info[1];
}

@Override
public String getKey() {
return KEY;
}

@Override
public String getTip() {
return "登录";
}
}

ChatConsoleCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Data
@Service
public class implements BaseCommand {
private String toUserId;
private String message;
public static final String KEY = "2";

@Override
public void exec(Scanner scanner) {
System.out.print("请输入聊天的消息(id:message):");
String[] info = null;
while (true) {
String input = scanner.nextLine(); // 使用 nextLine() 代替 next()
info = input.split(":");
if (info.length != 2) {
System.out.println("请输入聊天的消息(id:message):");
} else {
break;
}
}
toUserId = info[0];
message = info[1];
}


@Override
public String getKey() {
return KEY;
}

@Override
public String getTip() {
return "聊天";
}
}

LogoutConsoleCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
public class LogoutConsoleCommand implements BaseCommand {
public static final String KEY = "10";

@Override
public void exec(Scanner scanner) {
Logger.cfo("退出命令执行成功");
}

@Override
public String getKey() {
return KEY;
}

@Override
public String getTip() {
return "退出";
}
}


sender 相关

BaseSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Data
@Slf4j
public abstract class BaseSender {
private User user;
private ClientSession session;

public boolean isConnected() {
if (null == session) {
log.info("isConnected: session is null");
return false;
}
return session.isConnected();
}

public boolean isLogin() {
if (null == session) {
log.info("isLogin: session is null");
return false;
}
return session.isLogin();
}

public void sendMsg(Msg.Message message) {
if (null == getSession() || !isConnected()) {
log.info("连接还没成功");
return;
}
Channel channel = getSession().getChannel();
ChannelFuture f = channel.writeAndFlush(message);
f.addListener(future -> {
// 回调
if (future.isSuccess()) {
sendSuccessCallback(message);
} else {
sendFailedCallback(message);
}
});
}

protected void sendSuccessCallback(Msg.Message message) {
log.info("发送成功");

}

protected void sendFailedCallback(Msg.Message message) {
log.info("发送失败");
}
}

LoginSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
@Service
public class LoginSender extends BaseSender {
public void sendLoginMsg() {
if (!isConnected()) {
log.info("还没有建立连接!");
return;
}
log.info("构造登录消息");
Msg.Message message = LoginMsgConverter.build(getUser(), getSession());
log.info("发送登录消息");
super.sendMsg(message);
}
}

ChatSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
@Service
public class ChatSender extends BaseSender {

public void sendChatMsg(String toUid, String content) {
log.info("发送消息 startConnectServer");
ChatMsg chatMsg = new ChatMsg(getUser());
chatMsg.setContent(content);
chatMsg.setMsgType(ChatMsg.MSG_TYPE.TEXT);
chatMsg.setTo(toUid);
chatMsg.setMsgId(System.currentTimeMillis());
Msg.Message message = ChatMsgConverter.build(chatMsg, getUser(), getSession());
super.sendMsg(message);
}

@Override
protected void sendSuccessCallback(Msg.Message message) {
log.info("发送成功:{}", message.getMessageRequest().getContent());
}

@Override
protected void sendFailedCallback(Msg.Message message) {
log.info("发送失败:{}", message.getMessageRequest().getContent());
}
}

LogoutSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
@Service
public class LogoutSender extends BaseSender {
public void sendLogoutMsg() {
if (!isLogin()) return;

// 构造 Protobuf 消息,类型设为 LOGOUT_REQUEST
Msg.Message message = Msg.Message.newBuilder()
.setType(Msg.HeadType.LOGOUT_REQUEST)
.setSessionId(getSession().getSessionId())
.build();

super.sendMsg(message);
log.info("登出请求已发送");
}
}


handler 相关

LoginResponseHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Slf4j
@ChannelHandler.Sharable
@Service
public class LoginResponseHandler extends ChannelInboundHandlerAdapter {

@Resource
private ChatMsgHandler chatMsgHandler;

@Resource
private HeartBeatClientHandler heartBeatClientHandler;

/**
* 业务逻辑处理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (!(msg instanceof Msg.Message pkg)) {
super.channelRead(ctx, msg);
return;
}

// 判断类型
Msg.HeadType headType = pkg.getType();
if (!headType.equals(Msg.HeadType.LOGIN_RESPONSE)) {
super.channelRead(ctx, msg); // 把球继续传给队友
return;
}

// 判断返回是否成功
Msg.LoginResponse info = pkg.getLoginResponse();
ProtoInstant.ResultCodeEnum result = ProtoInstant.ResultCodeEnum.values()[info.getCode()];
if (!result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) {
// 登录失败
log.info(result.getDesc());
} else {
// 登录成功
ClientSession.loginSuccess(ctx, pkg);
ChannelPipeline p = ctx.pipeline();

// 最好先加后删,确保逻辑不中断,两个 add 执行完 decoder - heartbeat - chat - LoginResponseHandler - ExceptionHandler。
p.addBefore(ctx.name(), "heartbeat", heartBeatClientHandler);
p.addBefore(ctx.name(), "chat", chatMsgHandler);

// Netty 的 Handler 只有在刚加入 Pipeline 且 Channel 活跃时会自动触发 channelActive。
// 当登录成功时,连接早已建立(Active 过了)。新加入的 heartBeatClientHandler 会错过 channelActive 事件,导致心跳定时器根本没启动。
// 通过手动调用 startHeartBeat 显式触发。这保证了客户端在登录成功的瞬间,保活机制就立刻上线。
heartBeatClientHandler.startHeartBeat(ctx);

// 移除登录响应处理器
p.remove(this);
log.info("登录成功,业务处理器已就绪");
}
}
}

HeartBeatClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Slf4j
@ChannelHandler.Sharable
@Service
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
// 心跳的时间间隔,单位为s。通常建议设为服务端检测周期的 1/3。例如服务端 150s 检查一次,客户端建议 50s 发一次。
// 这里 1/3 的配置,允许客户端在网络拥塞时连续丢掉 2 个心跳包,只要第 3 个能送达,连接就不会断。
private static final int HEARTBEAT_INTERVAL = 50;
private boolean isStarted = false;

// 在Handler 被加入到 Pipeline 时,开始发送心跳
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (!isStarted) {
startHeartBeat(ctx);
isStarted = true;
}
}

public void startHeartBeat(ChannelHandlerContext ctx) {
if (!isStarted) {
ClientSession session = ClientSession.getSession(ctx);
Msg.Message message = new HeartBeatMsgConverter(session.getUser(), session).build();
heartBeat(ctx, message); // 发送心跳
isStarted = true;
}
}


// 使用定时器,发送心跳报文
public void heartBeat(ChannelHandlerContext ctx, Msg.Message heartbeatMsg) {
// scheduleAtFixedRate 是死板的定时,如果某次发送因为网络阻塞延迟了,定时器依然会准时触发,可能导致任务堆积。
// 而我们的方案是,在前一个任务执行完之后,再调度下一个。这保证了心跳之间永远有稳定的 HEARTBEAT_INTERVAL 间隔,系统负荷更平滑。
// 使用 ctx.executor() 是最正宗、性能最高的方式。任务绑定在当前客户端连接的线程上。连接断开了,任务通常随着 EventLoop 的关闭而停止。
// 它利用了 Netty 内置的定时调度任务队列(基于微型堆实现),比 Java 原生的 Timer 要轻量得多。
ctx.executor().schedule(() -> {
if (ctx.channel().isActive()) {
// 背压感知的细节
if (ctx.channel().isWritable()) {
log.info("发送 HEART_BEAT 消息到 server");
ctx.writeAndFlush(heartbeatMsg); // 非阻塞的,非常好!
} else {
log.warn("通道繁忙,跳过本次心跳发送");
}
// 无论本次是否发送成功,都要继续递归调度下一次,否则心跳就彻底停了
heartBeat(ctx, heartbeatMsg);
}
}, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
}

/**
* 接受到服务器的心跳回写
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (!(msg instanceof Msg.Message pkg)) {
super.channelRead(ctx, msg);
return;
}

// 判断类型
Msg.HeadType headType = pkg.getType();
if (headType.equals(Msg.HeadType.HEART_BEAT)) {
log.info("收到回写的 HEART_BEAT 消息 from server");
return;
} else {
super.channelRead(ctx, msg);
}
}
}

ChatMsgHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@ChannelHandler.Sharable
@Service
public class ChatMsgHandler extends ChannelInboundHandlerAdapter {

/**
* 业务逻辑处理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (!(msg instanceof Msg.Message pkg)) {
super.channelRead(ctx, msg);
return;
}

// 判断类型
Msg.HeadType headType = pkg.getType();
if (!headType.equals(Msg.HeadType.MESSAGE_REQUEST)) {
super.channelRead(ctx, msg);
return;
}

Msg.MessageRequest req = pkg.getMessageRequest();
String content = req.getContent();
String uid = req.getFrom();
System.out.println(" 收到消息 from uid:" + uid + " -> " + content);
}
}

ExceptionHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
@ChannelHandler.Sharable
@Service
public class ExceptionHandler extends ChannelInboundHandlerAdapter {
@Resource
private CommandController commandController;

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof InvalidFrameException) {
// 这是你的私有协议校验失败抛出的。如果魔数或版本号不对,说明数据帧已经损坏或者遭到了篡改。
// 此时选择 ClientSession.getSession(ctx).close(),这不仅关闭了物理连接,还确保了本地 Session 状态的清理。
// 由于这是协议级错误,通常不会立即触发重连,防止由于版本不匹配导致的无效死循环。
log.error(cause.getMessage());
ClientSession.getSession(ctx).close();
} else {
log.error(cause.getMessage());
ctx.close(); // 释放掉旧的、已经失效的文件句柄
if (null == commandController) {
return;
}
commandController.setConnectFlag(false); // 通知主线程连接已失效。
commandController.startConnectServer(); // 自愈逻辑,立刻尝试寻找服务器并重新建立管道。// 这里只做演示,实际的自愈应避免“重连风暴”,可以加入指数退避算法的逻辑
}
}

/**
* 通道 Read 读取 Complete 完成,做刷新操作
*/
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 虽然客户端发送频率可能不如服务端高,但 flush 操作确保了数据真正从 Netty 的缓冲区推送到操作系统的 TCP 缓冲区。
// 在高频聊天或心跳回复时,这种批量刷新机制能有效减少系统调用,保护电池续航(对于移动端而言)。
ctx.flush();
}
}


使用 assembly 打生产包

如何配置

服务端 im-server

依赖配置:pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<build>
<finalName>im-server</finalName>
<plugins>
<plugin>
<!--编译插件-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path>
</annotationProcessorPaths>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
<parameters>true</parameters>
</configuration>
</plugin>
<plugin>
<!--只有包含 main 方法的启动模块才需要这个插件-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<configuration>
<excludeDevtools>true</excludeDevtools>
<mainClass>com.owlias.im.server.ServerApp</mainClass>
</configuration>
</plugin>
<plugin>
<!--打分发包插件-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

日志配置:src/main/resources/lockback.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="APP_Name" value="im-server" />
<contextName>${APP_Name}</contextName>

<property name="LOG_HOME" value="${im_server_path:-./logs}" />
<property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{40}:%L - %msg%n" />

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/${APP_Name}.log</file>
<!--每天生成的旧日志自动压缩,磁盘占用减少90%-->
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/history/${APP_Name}-%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern>
<!--防止单日日志量过大导致文件无法打开,超过 100MB 自动分片。-->
<maxFileSize>100MB</maxFileSize>
<!--最多只保留30天,且总大小不超过20G-->
<MaxHistory>30</MaxHistory>
<totalSizeCap>20GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>${LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>

<!--确保写日志不阻塞 Netty 的 I/O 线程,这在高并发下能显著降低延迟-->
<appender name="ASYNC_FILE" class="ch.qos.logback.classic.AsyncAppender">
<discardingThreshold>0</discardingThreshold>
<queueSize>1024</queueSize>
<appender-ref ref="FILE" />
<includeCallerData>true</includeCallerData>
</appender>

<logger name="org.springframework" level="WARN" />
<logger name="io.netty" level="WARN" />
<logger name="org.hibernate" level="WARN" />
<logger name="com.apache.ibatis" level="WARN"/>
<logger name="org.apache.shiro" level="WARN"/>
<logger name="springfox.documentation" level="WARN"/>

<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="ASYNC_FILE" />
</root>
</configuration>

项目配置文件:src/main/resources/application.properties

1
2
3
4
chat.server.port=9000
chat.server.ip=127.0.0.1
server.web.user.url=http://localhost:8080/user
spring.main.web-application-type=none

打包配置:src/main/assembly/assembly.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<assembly>
<id>Alpha</id>
<formats>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/assembly/bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>target</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<!--<include>*.jar</include>-->
<include>${project.build.finalName}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>src/main/resources</directory>
<outputDirectory>config</outputDirectory>
<includes>
<include>application.properties</include>
<include>logback.xml</include>
</includes>
</fileSet>
</fileSets>
</assembly>

启动可执行文件:src/main/assembly/bin/start.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/bin/bash

# 获取脚本所在目录的上一级目录作为工作目录
WORK_PATH=$(cd $(dirname $0)/..; pwd)
# 对应 pom.xml 中的 finalName
JAR_NAME="im-client.jar"
JVM="-server -Xms128m -Xmx512m"
# 对应 logback.xml 中的变量名
export im_client_path="${WORK_PATH}/logs"

# 确保日志目录存在
mkdir -p ${im_client_path}

function start() {
echo "Starting IM-Client..."
# 移除 nohup 和 &,让程序在前台运行,这样你才能用 Scanner 输入,并且显示指定 logback.xml
java ${JVM} -Dlogging.config=${WORK_PATH}/config/logback.xml -jar ${WORK_PATH}/lib/${JAR_NAME} --spring.config.location=${WORK_PATH}/config/application.properties
}

function stop() {
pid=$(ps -ef | grep ${JAR_NAME} | grep -v grep | awk '{print $2}')
if [ -n "$pid" ]; then
echo "Stopping IM-Client (pid: $pid)..."
kill -15 "$pid" # 建议先用 15 触发 @PreDestroy 优雅关闭
sleep 3
# 如果还没关掉再强制关闭
kill -9 "$pid" 2>/dev/null
else
echo "${JAR_NAME} is not running."
fi
}

function status() {
pid=$(ps -ef | grep ${JAR_NAME} | grep -v grep | awk '{print $2}')
if [ -n "$pid" ]; then
echo "${JAR_NAME} is running, PID is $pid"
else
echo "${JAR_NAME} is stopped"
fi
}

case "$1" in
start) start ;;
stop) stop ;;
restart) stop; start ;;
status) status ;;
*) echo "Usage: $0 {start|stop|restart|status}" ;;
esac


客户端 im-client

日志配置:src/main/resources/lockback.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="APP_Name" value="im-client" />
<property name="LOG_HOME" value="${im_server_path:-./logs}" />
<contextName>${APP_Name}</contextName>

<conversionRule conversionWord="clr" class="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wEx" class="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />

<property name="CONSOLE_LOG_PATTERN"
value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(LN:%L){faint} : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}" />

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/${APP_Name}.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/history/${APP_Name}-%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern>
<maxFileSize>50MB</maxFileSize>
<MaxHistory>7</MaxHistory>
<totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{40}:%L - %msg%n</pattern>
<charset>utf8</charset>
</encoder>
</appender>

<logger name="org.springframework" level="WARN" />
<logger name="io.netty" level="WARN" />
<logger name="org.apache.http" level="WARN" />

<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</root>
</configuration>

打包配置:src/main/assembly/assembly.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<assembly>
<id>Alpha</id>
<formats>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/assembly/bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>target</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<!--<include>*.jar</include>-->
<include>${project.build.finalName}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>src/main/resources</directory>
<outputDirectory>config</outputDirectory>
<includes>
<include>application.properties</include>
<include>logback.xml</include>
</includes>
</fileSet>
</fileSets>
</assembly>

启动可执行文件:src/main/assembly/bin/start.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/bin/bash

# 获取脚本所在目录的上一级目录作为工作目录
WORK_PATH=$(cd $(dirname $0)/..; pwd)
# 对应 pom.xml 中的 finalName
JAR_NAME="im-client.jar"
JVM="-server -Xms128m -Xmx512m"
# 对应 logback.xml 中的变量名
export im_client_path="${WORK_PATH}/logs"

# 确保日志目录存在
mkdir -p ${im_client_path}

function start() {
echo "Starting IM-Client..."
# 移除 nohup 和 &,让程序在前台运行,这样你才能用 Scanner 输入,并且显示指定 logback.xml
java ${JVM} -Dlogging.config=${WORK_PATH}/config/logback.xml -jar ${WORK_PATH}/lib/${JAR_NAME} --spring.config.location=${WORK_PATH}/config/application.properties
}

function stop() {
pid=$(ps -ef | grep ${JAR_NAME} | grep -v grep | awk '{print $2}')
if [ -n "$pid" ]; then
echo "Stopping IM-Client (pid: $pid)..."
kill -15 "$pid" # 建议先用 15 触发 @PreDestroy 优雅关闭
sleep 3
# 如果还没关掉再强制关闭
kill -9 "$pid" 2>/dev/null
else
echo "${JAR_NAME} is not running."
fi
}

function status() {
pid=$(ps -ef | grep ${JAR_NAME} | grep -v grep | awk '{print $2}')
if [ -n "$pid" ]; then
echo "${JAR_NAME} is running, PID is $pid"
else
echo "${JAR_NAME} is stopped"
fi
}

case "$1" in
start) start ;;
stop) stop ;;
restart) stop; start ;;
status) status ;;
*) echo "Usage: $0 {start|stop|restart|status}" ;;
esac


如何使用

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ tar -zxvf im-server-Alpha.tar.gz
$ cd im-server
$ tree
.
├── bin
│   └── start.sh
├── config
│   ├── application.properties
│   └── logback.xml
├── lib
│ └── im-server.jar
└── logs
   ├── im-server.log
   └── console.log

$ bin/start.sh start
Starting IM-Server...
Service started, check logs at xxx/im-server/logs/console.log

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ bin/start.sh start
Starting IM-Client...
2026-04-18 20:37:23.685 INFO --- [ main] com.owlias.im.client.ClientApp LN:59 : Started ClientApp in 1.156 seconds (process running for 2.001)
请输入某个操作指令:[menu] 0->show 所有命令 | 1->登录 | 2->聊天 | 10->退出
1
请输入用户信息(id@password)
zhangsan@111
2026-04-18 20:42:20.080 INFO --- [ntLoopGroup-2-1] c.o.im.client.session.ClientSession LN:49 : 登录成功
2026-04-18 20:42:20.081 INFO --- [ntLoopGroup-2-1] c.o.i.c.handler.LoginResponseHandler LN:65 : 登录成功,业务处理器已就绪
请输入某个操作指令:[menu] 0->show 所有命令 | 1->登录 | 2->聊天 | 10->退出 |
2
请输入聊天的消息(id:message):请输入聊天的消息(id:message):
zhangsan:hello zhangsan
收到消息 from uid:zhangsan -> hello zhangsan
...


分布式IM需要解决的核心问题

问题描述

假如服务端是由三个节点1、2、3组成的,通常情况下我们理解的 “客户端A连接节点1,连上了就一直与节点1通信,不存在A的消息发送到节点2会3的情况”,这种理解是错误的,而且分布式场景下我们也要考虑端对端的各种通信路由,比如:

  • 物理链路断开后的“重连漂移”:这是最常见的情况。用户 A 原本连着节点 1,但突然 A 经过了一个隧道,手机信号切换(或者节点1负载过高踢掉 A)。这个时候,A的物理连接断开了,A客户端逻辑自动发起断线重连,负载均衡器(Nginx/LVS)发现节点1很忙,就把这次新的连接请求分配给了节点2。结果A就在节点2上 “安家”了。如果此时有人给 A 发消息,消息就必须去节点2找他。
  • 发送者与接收者不在同一个节点:这是分布式 IM 的本质场景。用户 A 连在节点 1,他的好朋友用户 B 连在节点 2,用户 B 想给用户 A 发一句“你好”。B 的消息首先物理到达节点 2,节点 2 检查本地内存,发现 A 不在自己这里,节点 2 必须把这条消息 “跨机” 送到节点 1。虽然 A 始终只跟节点 1 说话,但 A 要接收的消息,最初是出现在节点 2 上的。
  • 多端登录与信令路由:用户 A 同时用电脑(节点 1)和手机(节点 2)登录,有人给 A 发了一条消息,服务端必须保证 A 的两个设备都能收到。无论系统决定先处理哪端的逻辑,消息都不可避免地要在两个节点之间同步。

分布式的复杂性在于:发送者可能在任何地方。 只要用户 B 的物理连接落在了节点 2,那么他发给 A 的每一句话,第一站都是节点 2。节点 2 就像一个快递中转站,必须负责把消息发往 A 所在的 “片区”(节点 1)。这就使得我们在分布式情况下,必须考虑 “跨节点路由” 的本质原因。


业界做法

在实际工业界,针对千万级甚至亿级日活的场景(如腾讯、阿里、字节的 IM 架构),最好的解决方案通常是 “接入与逻辑彻底分离 + 全局状态网格”。这种架构被称为 Gateway-Logic 架构。这种架构解决了 “消息中转” 问题:

  • 解耦:业务员(Logic)不需要知道接线员(Gateway)的具体 IP,只需要查询 Redis 路由。
  • 性能:Logic 层可以根据计算需求独立扩容,而 Gateway 只负责搬运字节。
  • 可靠性:即使一个 Logic 节点挂了,MQ 的消息还在,其他 Logic 节点接手后依然能通过路由找到 Gateway 里的 A。

三层分离架构模型

为了解决 “重连漂移” 和 “跨机投递”,我们通常不会让 Netty 节点既当接线员又当业务员。

  • 第一层:接入层 (Gateway / Connector)

    • 职责:只负责维持 TCP/WebSocket 长连接,处理 TLS 加密、心跳检测和字节流编解码。
    • 特性:有状态。它知道 A 在哪条物理线路上,但它不知道 A 是谁(不关心业务)。
    • 稳定性:代码极简,几乎不更新,保证连接不断。
  • 第二层:逻辑层 (Logic / Center)

    • 职责:处理登录、好友关系、消息持久化、敏感词过滤、多端同步。
    • 特性:无状态。不持有任何 TCP 连接,通过 RPC(如 gRPC, Dubbo)与接入层通信。
    • 灵活性:可以随时水平扩容或重启,不影响用户连接。
  • 第三层:存储与路由层 (Routing / State)

    • 职责:记录 “UID -> Gateway_ID” 的映射关系。
    • 组件:通常使用 Redis 集群或分布式一致性存储(如 Etcd)。

在分布式 IM 的世界里,地址簿(Routing Table) 就是灵魂。无论客户端漂移到哪个节点,只要地址簿是实时更新的,消息就能通过这套 “中转体系”准确送达。我们的单机版IM如果想要改造成分布式版,第一步就是将 SessionMap 从本地内存移到 Redis 中,并让服务端具备 “根据 Redis 地址转发消息” 的能力。


三层架构的具体实现

场景一:跨节点路由(A在节点1,B在节点2)—— MQ 扇出模式

  • 最佳实践:消息队列 (MQ) + 路由索引。
  • 具体流程:
    • B 发消息给 A,请求到达网关 2。
    • 网关 2 把请求扔给逻辑层的随机一个节点。
    • 逻辑层查 Redis 路由表,发现 UserA 在 Gateway_1。
    • 逻辑层将消息投递到 MQ 的某个 Exchange,或者直接 RPC 调用网关 1的推送接口。
    • 网关 1 收到指令,从内存里找出 A 的 Channel,推下去。

场景二:多端登录(电脑在节点1,手机在节点2)—— 双向 Diff 同步

  • 最佳实践:逻辑层多点投递 + 读扩散/写扩散优化。
  • 具体流程:
    • 逻辑层发现 A 有两个活跃网关连接。
    • 逻辑层并行给网关 1 和网关 2 发送推送指令。

场景三:重连漂移 —— Session 抢占与清理

  • 最佳实践:分布式 Session 锁。
  • 具体流程:
    • A 连接到网关 2。
    • 网关 2 告诉逻辑层:“A 在我这登录了”。
    • 逻辑层更新 Redis 路由为 “A -> Gateway_2”,并发现 A 之前在 “Gateway_1”。
    • 逻辑层向 Gateway_1 发起 “踢人” 指令。Gateway_1 收到后断开旧连接。

另外,为了保证在复杂的网络环境下消息的可靠性,我们主动引入 SeqId 机制。SeqId 是一个用户维度单调递增的数字,通常是64位长整型。服务端会在数据库里记录发送给 A 的当前最大 ID 是多少,比如 max_seq = 100。要理解序列号ID,我们得先抛弃 “服务端直接把消息推给客户端就完事了” 的简单想法。在不稳定的移动网络下,单靠服务端推送(Push)是无法保证消息不丢(网络闪断没收到)和不乱(后发的先到了)的。

  • 情景一:假设用户 B 连续给 A 发了两条消息:“你好”(Seq: 101)、“在吗?”(Seq: 102)。由于网络路径不同,Seq 102 可能会比 Seq 101先到达 A 的手机。如果没有 Seq ID,A 看到的对话顺序会变成 “在吗?— 你好”,逻辑全乱了。有了 Seq ID,客户端 A 收到 102 时,发现本地最大 ID 还是 100,它知道中间漏了 101。此时客户端可以暂时不显示 102,或者在界面上按 ID 重新排序。
  • 情景二:当客户端 A 收到 Seq 为 105 的消息,但本地最后一条消息是 102 时,客户端立刻意识到:中间的 103 和 104 丢了!此时客户端会自动向服务端发起 PullRequest(start=103, end=104) 请求,把丢失的消息补回来。
  • 情景三:当用户 A 经过隧道重新连上节点 2 时:客户端A上报告诉服务器:“我本地最后一条消息 ID 是 105”。服务器查数据库发现:“发给 A 的最大 ID 已经是 110 了”。服务器不再等待 A 说话,直接把 106 到 110 的消息打包推给 A。


技术选型建议


实际完整架构

实际的架构通常如下:

  • 接入层(Netty 集群):开发者通常手动编写基于 Netty 的 Spring Boot 程序。每个节点会暴露两个接口:
    • 面向用户:TCP/WebSocket 端口(如 8081),维持长连接。
    • 面向内部:RPC 端口(如 Dubbo/gRPC),让逻辑层能找回来。
  • 逻辑层(Spring Cloud 微服务集群):使用 Spring Cloud 体系(Nacos, Sentinel, OpenFeign)。
    • 当一个消息进来,接入层通过 Spring Cloud 的负载均衡算法,随机挑选一个逻辑节点处理业务。
  • 状态网格(Redis + MQ):这是连接两层的纽带。
    • Redis:存储 “UID -> Gateway_IP”。逻辑层处理完业务,查 Redis 知道该发往哪个网关。
    • RocketMQ/RabbitMQ:逻辑层将推送指令发给 MQ,各个网关节点订阅自己的队列。网关层和逻辑层异步解耦。

为什么不直接用 Spring Cloud Gateway 做接入层呢?虽然 Spring Cloud Gateway 也是基于 Netty 的(Reactor Netty),但 IM 系统有几个特殊需求:

  • 私有协议支持:我们需要处理自定义的 Protobuf。
  • 内存占用极致优化:接入层节点需要承载百万级连接,应手动控制 Netty 的 ByteBuf 申请和释放(池化)。
  • 心跳策略定制:IM 的心跳往往带有业务属性(如多端同步位点),需要深度控制 Pipeline。